# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABCMeta, abstractmethod
import sympy as sm
from hysop.tools.numpywrappers import npw
from hysop.tools.htypes import check_instance, to_tuple, first_not_None, InstanceOf
from hysop.tools.decorators import debug
from hysop.tools.sympy_utils import exponent, subscript, partial
from hysop.constants import DirectionLabels, SpaceDiscretization
from hysop.parameters.tensor_parameter import TensorParameter
from hysop.symbolic.relational import Assignment
from hysop.core.memory.memory_request import MemoryRequest
from hysop.core.graph.graph import op_apply
from hysop.topology.cartesian_descriptor import CartesianTopologyDescriptors
from hysop.fields.continuous_field import Field, ScalarField
from hysop.operator.base.spectral_operator import SpectralOperatorBase
[docs]
class SpaceDerivativeBase(metaclass=ABCMeta):
"""
Common implementation interface for derivative operators.
"""
@debug
def __new__(
cls,
F,
dF,
A=None,
derivative=None,
direction=None,
variables=None,
name=None,
pretty_name=None,
require_tmp=None,
**kwds,
):
return super().__new__(
cls,
name=name,
pretty_name=pretty_name,
input_fields=None,
output_fields=None,
input_params=None,
**kwds,
)
@debug
def __init__(
self,
F,
dF,
A=None,
derivative=None,
direction=None,
variables=None,
name=None,
pretty_name=None,
require_tmp=None,
**kwds,
):
"""
SpaceDerivative operator base.
Compute the derivative of a field in a given direction
on a given backend, possibly scaled by another field/parameter/value.
dF = A * dF/dxj
Parameters
----------
F: hysop.field.continuous_field.ScalarField
Continuous field as input.
dF: hysop.field.continuous_field.ScalarField
Continuous field to be written.
Some backend may allow inplace differentiation.
A: numerical value, ScalarParameter or ScalarField, optional
Scaling for convenience, defaults to 1.
derivative: int or tuple, optional
Which derivative to generate, defaults to (0,)*(dim-1)+(1,).
ie. first order derivative in X axis.
If integer is given, the derivative is taken in given direction.
direction: int, optional
Directions in which to take the derivative.
Defaults to None.
Should be None if derivative is a tuple.
require_tmp: bool, optional
Should this operator generate a tmp array ?
Default to True if F is dF (inplace computation) else False.
variables: dict
Dictionary of fields as keys and topology descriptors as values.
name: str, optional
Name of this operator
pretty_name: str, optional
Pretty name of this operator.
kwds: dict, optional
Base class keyword arguments.
Notes
-----
There is two way to build a derivative:
(1) derivative(int) + direction(int) gives:
=> derivative=(0,0,0,0,kd,0,0,0)
where the index of kd is direction
and kd=derivative
(2) derivative(tuple) + direction(None) gives:
=> derivative=(k0,...,kn)
"""
assert derivative is not None
A = first_not_None(A, 1)
check_instance(F, ScalarField)
check_instance(dF, ScalarField, allow_none=True)
if isinstance(derivative, tuple):
assert len(derivative) == F.dim
else:
direction = first_not_None(direction, 0)
_derivative = [
0,
] * F.dim
_derivative[direction] = derivative
derivative = tuple(_derivative)
check_instance(derivative, tuple, size=F.dim, minval=0)
nz_derivatives = tuple(x for x in derivative if (x > 0))
if len(nz_derivatives) == 1:
directional_derivative = nz_derivatives[0]
_direction = derivative.index(directional_derivative)
if direction is not None:
assert _direction == direction
else:
direction = _direction
else:
assert direction is None
directional_derivative = None
expr = F.s()
for i, xi in enumerate(F.domain.frame.coords):
if derivative[i] > 0:
expr = expr.diff(xi, derivative[i])
_dF = F.from_sympy_expression(
expr=expr, space_symbols=F.domain.frame.coords, is_tmp=True
)
default_name = _dF.name
default_pretty_name = _dF.pretty_name
if dF is None:
dF = _dF
check_instance(derivative, tuple, size=F.dim, minval=0)
check_instance(direction, int, minval=0, maxval=F.dim - 1, allow_none=True)
check_instance(directional_derivative, int, minval=0, allow_none=True)
name = first_not_None(name, default_name)
pretty_name = first_not_None(pretty_name, default_pretty_name)
variables = first_not_None(variables, {})
check_instance(name, str)
check_instance(pretty_name, str)
check_instance(variables, dict, keys=Field, values=CartesianTopologyDescriptors)
input_fields = {F: variables.get(F, None)}
output_fields = {dF: variables.get(dF, input_fields[F])}
input_params = set()
is_inplace = dF is F
require_tmp = first_not_None(require_tmp, is_inplace)
scale_by_field, scale_by_parameter, scale_by_value = (False, False, False)
if isinstance(A, ScalarField):
input_fields[A] = variables.get(A, input_fields[F])
scale_by_field = True
elif isinstance(A, TensorParameter):
input_params.update({A})
scale_by_parameter = True
elif isinstance(A, (float, int, npw.number, sm.Basic)):
scale_by_value = (A != 1) and (A != 1.0)
else:
msg = f"Unknown type for scaling parameter {type(A)}."
raise TypeError(A)
super().__init__(
name=name,
pretty_name=pretty_name,
input_fields=input_fields,
output_fields=output_fields,
input_params=input_params,
**kwds,
)
self.Fin = F
self.Fout = dF
self.A = A
self.derivative = derivative
self.directional_derivative = directional_derivative
self.direction = direction
self.is_inplace = is_inplace
self.require_tmp = require_tmp
self.scale_by_field = scale_by_field
self.scale_by_parameter = scale_by_parameter
self.scale_by_value = scale_by_value
[docs]
@debug
def discretize(self):
if self.discretized:
return
super().discretize()
self.dFin = self.get_input_discrete_field(self.Fin)
self.dFout = self.get_output_discrete_field(self.Fout)
A = self.A
if A in self.input_discrete_fields:
self.dA = self.input_discrete_fields[A]
else:
self.dA = A
[docs]
@debug
def get_work_properties(self):
requests = super().get_work_properties()
if self.require_tmp:
request = MemoryRequest.empty_like(
a=self.dFout,
nb_components=1,
shape=self.dFout.compute_resolution,
backend=self.backend,
)
requests.push_mem_request("tmp", request)
return requests
[docs]
@debug
def setup(self, work):
super().setup(work)
if work is None:
raise ValueError("work is None.")
if self.require_tmp:
(self.dtmp,) = work.get_buffer(self, "tmp")
[docs]
class FiniteDifferencesSpaceDerivativeBase(SpaceDerivativeBase):
def __new__(cls, **kwds):
return super().__new__(cls, **kwds)
def __init__(self, **kwds):
super().__init__(**kwds)
directional_derivative, direction = self.directional_derivative, self.direction
msg = "FiniteDifferencesSpaceDerivative only supports directional derivatives."
assert isinstance(direction, int), msg
assert isinstance(directional_derivative, int), msg
[docs]
@classmethod
def default_method(cls):
dm = super().default_method()
dm.update({SpaceDiscretization: 2})
return dm
[docs]
@classmethod
def available_methods(cls):
am = super().available_methods()
am.update({SpaceDiscretization: InstanceOf(int)})
return am
[docs]
@debug
def handle_method(self, method):
super().handle_method(method)
if not hasattr(self, "space_discretization"):
space_discretization = method.pop(SpaceDiscretization)
assert 2 <= space_discretization, space_discretization
assert space_discretization % 2 == 0, space_discretization
self.space_discretization = space_discretization
[docs]
class SpectralSpaceDerivativeBase(SpectralOperatorBase, SpaceDerivativeBase):
@debug
def __new__(cls, testing=False, **kwds):
kwds["require_tmp"] = False
return super().__new__(cls, **kwds)
@debug
def __init__(self, testing=False, **kwds):
kwds["require_tmp"] = False
super().__init__(**kwds)
F, dF = self.Fin, self.Fout
derivative = self.derivative
tg = self.new_transform_group()
axes = tuple(i for (i, di) in enumerate(derivative[::-1]) if (di > 0))
if testing and not axes:
axes = tuple(range(F.dim))
elif not axes:
msg = f"No transform axes found, got derivative={derivative}."
raise RuntimeError(msg)
Ft = tg.require_forward_transform(F, axes=axes, custom_output_buffer="auto")
Bt = tg.require_backward_transform(
dF, axes=axes, custom_input_buffer="auto", matching_forward_transform=Ft
)
assert Ft.output_dtype == Bt.input_dtype
dFt = Ft.s
assert len(derivative) == F.dim
for i, di in enumerate(derivative):
if di > 0:
dFt = dFt.diff(Ft.s.all_vars[i], di)
expr = Assignment(Bt.s, dFt)
kds = tg.push_expressions(expr)
self.Ft = Ft
self.Bt = Bt
self.tg = tg
self.kds = kds
self.expr = expr
[docs]
def discretize(self):
super().discretize()
dkds, nd_dkds = (), ()
for kd in self.kds:
_, dkd, nd_dkd = self.tg.discrete_wave_numbers[kd]
dkds += (dkd,)
nd_dkds += (nd_dkd,)
self.dkds = dkds
self.nd_dkds = nd_dkds